package com.gl.sass.mq.producer;

import com.alibaba.fastjson.JSON;
import com.gl.sass.mq.producer.retry.LimitDelayQueue;
import com.gl.sass.mq.producer.retry.RetryMessage;

import lombok.Getter;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

/**
 * 
 * @author xiehong
 *
 */
@Getter
public class RocketMQProducer {

    private static final Logger log = LoggerFactory.getLogger(RocketMQProducer.class);

    private DefaultMQProducer producer;

    private LimitDelayQueue<RetryMessage> limitDelayQueue;

    private boolean logger;

    public RocketMQProducer(DefaultMQProducer producer) {
        this(producer, true);
    }

    public RocketMQProducer(DefaultMQProducer producer, boolean logger) {
        this.producer = producer;
        this.logger = logger;
    }

    public SendResult send(String topic, String tag, Object obj) {
        return send(topic, tag, obj, null);
    }

    public void sendInOneway(String topic, String tag, Object obj) {
        sendInOneway(topic, tag, obj, null);
    }

    /**
     * @param topic 主题
     * @param tag   标签
     * @param obj   消息内容
     * @param level 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     *              1  2   3   4  5  6  7  8  9  10 11 12 13 14  15  16  17 18
     * @return
     */
    public SendResult send(String topic, String tag, Object obj, Integer level) {
        Message msg = getMessage(topic, tag, obj);
        Optional.ofNullable(level).ifPresent(msg::setDelayTimeLevel);
        SendResult sendResult = null;
        try {

            sendLog(obj);
            sendResult = this.getProducer().send(msg);
            log(sendResult);
        } catch (Exception e) {
            retryQueue(msg);
            log(e);
        }

        return sendResult;
    }

    /***
     * 单项发送
     * @param topic 主题
     * @param tag   标签
     * @param obj   消息内容
     * @param level 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     *              1  2   3   4  5  6  7  8  9  10 11 12 13 14  15  16  17 18
     * @return
     */
    public void sendInOneway(String topic, String tag, Object obj, Integer level) {
        Message msg = getMessage(topic, tag, obj);
        Optional.ofNullable(level).ifPresent(msg::setDelayTimeLevel);
        try {
            this.getProducer().sendOneway(msg);
        } catch (Exception e) {
            log(e);
        }
    }

    /**
     * 发送mq消息
     *
     * @param message 消息
     * @return
     * @throws Exception
     */
    protected SendResult send(Message message) throws Exception {
        SendResult sendResult;
        try {
            sendResult = this.getProducer().send(message);
        } catch (Exception e) {
            throw e;
        }
        return sendResult;
    }

    public Message getMessage(String topic, String tag, Object obj) {
        String body = JSON.toJSONString(obj);
        return new Message(topic, tag, body.getBytes());
    }

    public void setLimitDelayQueue(LimitDelayQueue<RetryMessage> limitDelayQueue) {
        this.limitDelayQueue = limitDelayQueue;
    }

    public void destory() {
        Optional.ofNullable(this.getProducer()).ifPresent(DefaultMQProducer::shutdown);
    }

    public void log(Exception e) {
        if (logger) {
        	log.error("消息发送异常：{}", e);
        }
    }

    public void log(SendResult sendResult) {
        if (logger) {
            log.info("发送消息完毕：sendResult=" + sendResult.toString());
        }
    }

    private void sendLog(Object object) {
        if (logger) {
            log.info("发送消息：content=" + JSON.toJSONString(Optional.ofNullable(object).orElse("")));
        }
    }

    /**
     * 判断是否用了重试逻辑
     *
     * @param msg
     */
    private void retryQueue(Message msg) {
        Optional.ofNullable(limitDelayQueue).ifPresent(o -> {
            log.warn("消息发送失败，放入重试队列等待稍后重试:{}, 当前队列中待重试数：{}", msg, limitDelayQueue.size());
            o.add(new RetryMessage(msg));
        });
    }

}
